धाराएँ क्या हैं?
Node.js , , .
इन्हें कन्वेयर बेल्ट के रूप में सोचें जो डेटा को एक स्थान से दूसरे स्थान तक ले जाते हैं, जिससे आपको पूरे डेटा सेट की प्रतीक्षा करने के बजाय प्रत्येक टुकड़े पर काम करने की अनुमति मिलती है।
स्ट्रीम Node.js की सबसे शक्तिशाली विशेषताओं में से एक हैं और इनका व्यापक रूप से उपयोग किया जाता है:
फ़ाइल स्वरूप फ़ंक्शंस
फ़ाइलें ले जाना/लिखना
HTTP अनुरोध और प्रतिक्रियाएँ
वेब अनुरोधों और प्रतिक्रियाओं को संभालना
डेटा संपीड़न और विस्तार
डेटा को संक्षिप्त और विस्तृत करें
डेटाबेस संचालन
डेटाबेस संचालन को संभालना
वास्तविक समय डेटा प्रोसेसिंग
वास्तविक समय डेटा विनिमय और प्रसंस्करण
स्ट्रीम के साथ शुरुआत करना
Node.js में डेटा के कुशल हेरफेर के लिए स्ट्रीम मूलभूत अवधारणाओं में से एक है।
वे आपको डेटा को उपलब्ध होने पर टुकड़ों में संसाधित करने की अनुमति देते हैं, बजाय एक ही बार में सब कुछ मेमोरी में लोड करने के।
मूल स्ट्रीम उदाहरण
const fs = require('fs');
// Create a readable stream from a file
const readableStream = fs.createReadStream('input.txt', 'utf8');
// Create a writable stream to a file
const writableStream = fs.createWriteStream('output.txt');
// Pipe the data from readable to writable stream
readableStream.pipe(writableStream);
// Handle completion and errors
writableStream.on('finish', () => {
console.log('File copy completed!');
});
readableStream.on('error', (err) => {
console.error('Error reading file:', err);
});
writableStream.on('error', (err) => {
console.error('Error writing file:', err);
});
स्ट्रीम का उपयोग क्यों करें?
स्ट्रीम का उपयोग करने के कई लाभ हैं:
धाराओं के लाभ:
512MB RAM वाले सर्वर पर 1GB फ़ाइल ले जाने की कल्पना करें:
- धाराओं के बिना:संपूर्ण फ़ाइल को मेमोरी में लोड करने का प्रयास करते समय प्रक्रिया क्रैश हो जाती है
- धाराओं के साथ:आप फ़ाइल को छोटे-छोटे टुकड़ों में संसाधित करते हैं (उदाहरण के लिए, एक समय में 64KB)।
महत्वपूर्ण स्ट्रीम प्रकार
Node.js , :
| स्ट्रीम प्रकार | व्याख्या | सामान्य उदाहरण |
|---|---|---|
| Readable | स्ट्रीम जो डेटा पास कर सकती हैं (डेटा द्वारा) | fs.createReadStream(), HTTP , process.stdin |
| Writable | स्ट्रीम जिसमें डेटा लिखा जा सकता है (डेटा गंतव्य) | fs.createWriteStream(), HTTP , process.stdout |
| Duplex | स्ट्रीम पढ़ने योग्य और लिखने योग्य दोनों हैं | टीसीपी सॉकेट, ज़्लिब स्ट्रीम |
| Transform | डुप्लेक्स स्ट्रीम जो डेटा को लिखते और प्रसारित करते समय बदल सकती है | ज़्लिब धाराएँ, क्रिप्टो धाराएँ |
नोट:
Node.js EventEmitter , .
पठनीय धाराएँ
पठनीय धाराएँ आपको किसी स्रोत से डेटा पढ़ने की अनुमति देती हैं। उदाहरण:
- किसी फ़ाइल से हटना
- ब्रांचेंड पर HTTP प्रतिक्रियाएँ
- सर्वर से HTTP अनुरोध
- process.stdin
एक पठनीय स्ट्रीम बनाना
const fs = require('fs');
// Create a readable stream from a file
const readableStream = fs.createReadStream('myfile.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KB chunks
});
// Events for readable streams
readableStream.on('data', (chunk) => {
console.log(`Received ${chunk.length} bytes of data.`);
console.log(chunk);
});
readableStream.on('end', () => {
console.log('No more data to read.');
});
readableStream.on('error', (err) => {
console.error('Error reading from stream:', err);
});
कदम बढ़ाने के तरीके
पठनीय धाराएँ दो तरीकों में से एक में काम करती हैं:
- Flowing Mode:डेटा को स्रोत से तुरंत पार्स किया जाता है और ईवेंट का उपयोग करके आपके एप्लिकेशन तक पहुंचाया जाता है
- Paused Mode:स्ट्रीम से डेटा का बड़ा हिस्सा प्राप्त करने के लिए आपको स्पष्ट रूप से Stream.read() को कॉल करना होगा
const fs = require('fs');
// Paused mode example
const readableStream = fs.createReadStream('myfile.txt', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // 64KB chunks
});
// Manually consume the stream using read()
readableStream.on('readable', () => {
let chunk;
while (null !== (chunk = readableStream.read())) {
console.log(`Read ${chunk.length} bytes of data.`);
console.log(chunk);
}
});
readableStream.on('end', () => {
console.log('No more data to read.');
});
लिखने योग्य धाराएँ
लिखने योग्य स्ट्रीम आपको किसी गंतव्य पर डेटा लिखने की अनुमति देती हैं। उदाहरण:
- किसी फ़ाइल में लिखना
- ब्रांचएंड पर HTTP अनुरोध
- सर्वर पर HTTP प्रतिक्रियाएँ
- process.stdout
एक लिखने योग्य स्ट्रीम बनाना
const fs = require('fs');
// Create a writable stream to a file
const writableStream = fs.createWriteStream('output.txt');
// Write data to the stream
writableStream.write('Hello, ');
writableStream.write('World!');
writableStream.write('\nWriting to a stream is easy!');
// End the stream
writableStream.end();
// Events for writable streams
writableStream.on('finish', () => {
console.log('All data has been written to the file.');
});
writableStream.on('error', (err) => {
console.error('Error writing to stream:', err);
});
अभिघातज के बाद के तनाव का प्रबंधन
स्ट्रीम पर लिखते समय, विलंबता तब होती है जब डेटा संसाधित होने की तुलना में तेज़ी से लिखा जाता है।
राइट() विधि एक बूलियन मान लौटाती है जो यह दर्शाती है कि लिखना जारी रखना सुरक्षित है या नहीं।
const fs = require('fs');
const writableStream = fs.createWriteStream('output.txt');
function writeData() {
let i = 100;
function write() {
let ok = true;
do {
i--;
if (i === 0) {
// Last time, close the stream
writableStream.write('Last chunk!\n');
writableStream.end();
} else {
// Continue writing data
const data = `Data chunk ${i}\n`;
// Write and check if we should continue
ok = writableStream.write(data);
}
}
while (i > 0 && ok);
if (i > 0) {
// We need to wait for the drain event before writing more
writableStream.once('drain', write);
}
}
write();
}
writeData();
writableStream.on('finish', () => {
console.log('All data written successfully.');
});
Pipe
पाइप() विधि एक पठनीय स्ट्रीम को एक लिखने योग्य स्ट्रीम से जोड़ती है, स्वचालित रूप से डेटा के प्रवाह को प्रबंधित करती है, और बाद के संपीड़न को संभालती है।
यह धाराओं का उपभोग करने का एक आसान तरीका है।
const fs = require('fs');
// Create readable and writable streams
const readableStream = fs.createReadStream('source.txt');
const writableStream = fs.createWriteStream('destination.txt');
// Pipe the readable stream to the writable stream
readableStream.pipe(writableStream);
// Handle completion and errors
readableStream.on('error', (err) => {
console.error('Read error:', err);
});
writableStream.on('error', (err) => {
console.error('Write error:', err);
});
writableStream.on('finish', () => {
console.log('File copy completed!');
});
चेन पाइप
आप पाइप() का उपयोग करके कई स्ट्रीम को एक साथ जोड़ सकते हैं।
ट्रांसफ़ॉर्म स्ट्रीम के साथ काम करते समय यह विशेष रूप से उपयोगी है।
const fs = require('fs');
const zlib = require('zlib');
// Create a pipeline to read a file, compress it, and write to a new file
fs.createReadStream('source.txt')
.pipe(zlib.createGzip()) // Compress the data
.pipe(fs.createWriteStream('destination.txt.gz'))
.on('finish', () => {
console.log('File compressed successfully!');
});
नोट:
पाइप() विधि गंतव्य स्ट्रीम लौटाती है, जो श्रृंखला चलाती है।
डुप्लेक्स और ट्रांसफ़ॉर्म धाराएँ
द्वैध धाराएँ
डुप्लेक्स स्ट्रीम दो-तरफा पाइप की तरह पढ़ने योग्य और लिखने योग्य दोनों हैं।
टीसीपी सॉकेट डुप्लेक्स स्ट्रीम का एक अच्छा उदाहरण है।
const net = require('net');
// Create a TCP server
const server = net.createServer((socket) => {
// 'socket' is a duplex stream
// Handle incoming data (readable side)
socket.on('data', (data) => {
console.log('Received:', data.toString());
// Echo back (writable side)
socket.write(`Echo: ${data}`);
});
socket.on('end', () => {
console.log('Client disconnected');
});
});
server.listen(8080, () => {
console.log('Server listening on port 8080');
});
// To test, you can use a tool like netcat or telnet:
// $ nc localhost 8080
// or create a client:
/*
const client = net.connect({ port: 8080 }, () => {
console.log('Connected to server');
client.write('Hello from client!');
});
client.on('data', (data) => {
console.log('Server says:', data.toString());
client.end(); // Close the connection
});
*/
धाराओं को रूपांतरित करें
ट्रांसफ़ॉर्म स्ट्रीम डुप्लेक्स स्ट्रीम हैं जो डेटा को अपने पथ से गुजरते समय रूपांतरित कर सकती हैं।
वे पाइपों पर डेटा संसाधित करने के लिए आदर्श हैं।
const { Transform } = require('stream');
const fs = require('fs');
// Create a transform stream that converts text to uppercase
class UppercaseTransform extends Transform {
_transform(chunk, encoding, callback) {
// Transform the chunk to uppercase
const upperChunk = chunk.toString().toUpperCase();
// Push the transformed data
this.push(upperChunk);
// Signal that we're done with this chunk
callback();
}
}
// Create an instance of our transform stream
const uppercaseTransform = new UppercaseTransform();
// Create a readable stream from a file
const readableStream = fs.createReadStream('input.txt');
// Create a writable stream to a file
const writableStream = fs.createWriteStream('output-uppercase.txt');
// Pipe the data through our transform stream
readableStream
.pipe(uppercaseTransform)
.pipe(writableStream)
.on('finish', () => {
console.log('Transformation completed!');
});
स्ट्रीम इवेंट
सभी स्ट्रीम इवेंटएमिटर के उदाहरण हैं और कई इवेंट उत्सर्जित करते हैं:
पठनीय स्ट्रीम घटनाएँ
- data:स्ट्रीम में कदम रखने के लिए डेटा उपलब्ध होने पर उत्सर्जित होता है
- end:उत्सर्जित तब होता है जब उपभोग करने के लिए कोई डेटा नहीं होता है
- error:यदि चरण के दौरान कोई त्रुटि होती है तो उत्सर्जित किया जाता है
- close:यदि स्ट्रीम का अंतर्निहित संसाधन बंद है तो उत्सर्जित होता है
- readable:जब डेटा प्रसंस्करण के लिए उपलब्ध होता है तो उत्सर्जित होता है
लिखने योग्य स्ट्रीम इवेंट
- drain:यह तब उत्सर्जित होता है जब राइट() विधि गलत रिटर्न के बाद स्ट्रीम अधिक डेटा स्वीकार करने के लिए तैयार होती है
- finish:बेस सिस्टम में फ्लश होने पर सारा डेटा उत्सर्जित हो जाता है
- error:लिखते समय कोई त्रुटि होने पर उत्सर्जित
- close:यदि स्ट्रीम का अंतर्निहित संसाधन बंद है तो उत्सर्जित होता है
- pipe:जब पाइप() विधि को पठनीय स्ट्रीम पर बुलाया जाता है तो उत्सर्जित होता है
- unpipe:जब अनपाइप() विधि को पठनीय स्ट्रीम पर बुलाया जाता है तो उत्सर्जित होता है
stream.pipeline() तरीका
पाइपलाइन() फ़ंक्शन (Node.js v10.0.0 के बाद से उपलब्ध) स्ट्रीम को एक साथ श्रृंखलाबद्ध करने का एक अधिक मजबूत तरीका है, विशेष रूप से त्रुटि प्रबंधन के लिए।
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
// Create a pipeline that handles errors properly
pipeline(
fs.createReadStream('source.txt'),
zlib.createGzip(),
fs.createWriteStream('destination.txt.gz'),
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded!');
}
}
);
नोट:
यदि उनमें से कोई भी त्रुटि हो तो पाइपलाइन() सभी स्ट्रीम को ठीक से साफ़ कर देगी, जिससे संभावित मेमोरी लीक को रोका जा सकेगा।
ऑब्जेक्ट मोड स्ट्रीम
डिफ़ॉल्ट रूप से, स्ट्रीम स्ट्रिंग्स और बफ़र ऑब्जेक्ट के साथ काम करती हैं।
हालाँकि, जावास्क्रिप्ट ऑब्जेक्ट के साथ काम करने के लिए स्ट्रीम को 'ऑब्जेक्ट मोड' पर सेट किया जा सकता है।
const { Readable, Writable, Transform } = require('stream');
// Create a readable stream in object mode
const objectReadable = new Readable({
objectMode: true,
read() {} // Implementation required but can be no-op
});
// Create a transform stream in object mode
const objectTransform = new Transform({
objectMode: true,
transform(chunk, encoding, callback) {
// Add a property to the object
chunk.transformed = true;
chunk.timestamp = new Date();
this.push(chunk);
callback();
}
});
// Create a writable stream in object mode
const objectWritable = new Writable({
objectMode: true,
write(chunk, encoding, callback) {
console.log('Received object:', chunk);
callback();
}
});
// Connect the streams
objectReadable
.pipe(objectTransform)
.pipe(objectWritable);
// Push some objects to the stream
objectReadable.push({ name: 'Object 1', value: 10 });
objectReadable.push({ name: 'Object 2', value: 20 });
objectReadable.push({ name: 'Object 3', value: 30 });
objectReadable.push(null); // Signal the end of data
उन्नत स्ट्रीम विधियाँ
1. पाइपलाइन के साथ त्रुटि प्रबंधन()
स्ट्रीम श्रृंखलाओं में त्रुटियों को संभालने के लिए पाइपलाइन() विधि अनुशंसित तरीका है:
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
pipeline(
fs.createReadStream('input.txt'),
zlib.createGzip(),
fs.createWriteStream('output.txt.gz'),
(err) => {
if (err) {
console.error('Pipeline failed:', err);
} else {
console.log('Pipeline succeeded');
}
}
);
2. ऑब्जेक्ट मोड स्ट्रीम
स्ट्रीम जावास्क्रिप्ट ऑब्जेक्ट के साथ काम कर सकती हैं, न कि केवल स्ट्रिंग और बफ़र्स के साथ:
const { Readable } = require('stream');
// Create a readable stream in object mode
const objectStream = new Readable({
objectMode: true,
read() {}
});
// Push objects to the stream
objectStream.push({ id: 1, name: 'Alice' });
objectStream.push({ id: 2, name: 'Bob' });
objectStream.push(null); // Signal end of stream
// Consume the stream
objectStream.on('data', (obj) => {
console.log('Received:', obj);
});
व्यावहारिक उदाहरण
HTTP स्ट्रीमिंग
HTTP अनुरोधों और प्रतिक्रियाओं में स्ट्रीम का व्यापक रूप से उपयोग किया जाता है।
const http = require('http');
const fs = require('fs');
// Create an HTTP server
const server = http.createServer((req, res) => {
// Handle different routes
if (req.url === '/') {
// Send a simple response
res.writeHead(200, { 'Content-Type': 'text/html' });
res.end('Stream Demo
Try streaming a file or streaming a video.
');
}
else if (req.url === '/file') {
// Stream a large text file
res.writeHead(200, { 'Content-Type': 'text/plain' });
const fileStream = fs.createReadStream('largefile.txt', 'utf8');
// Pipe the file to the response (handles backpressure automatically)
fileStream.pipe(res);
// Handle errors
fileStream.on('error', (err) => {
console.error('File stream error:', err);
res.statusCode = 500;
res.end('Server Error');
});
}
else if (req.url === '/video') {
// Stream a video file with proper headers
const videoPath = 'video.mp4';
const stat = fs.statSync(videoPath);
const fileSize = stat.size;
const range = req.headers.range;
if (range) {
// Handle range requests for video seeking
const parts = range.replace(/bytes=/, "").split("-");
const start = parseInt(parts[0], 10);
const end = parts[1] ? parseInt(parts[1], 10) : fileSize - 1;
const chunksize = (end - start) + 1;
const videoStream = fs.createReadStream(videoPath, { start, end });
res.writeHead(206, {
'Content-Range': `bytes ${start}-${end}/${fileSize}`,
'Accept-Ranges': 'bytes',
'Content-Length': chunksize,
'Content-Type': 'video/mp4'
});
videoStream.pipe(res);
} else {
// No range header, send entire video
res.writeHead(200, {
'Content-Length': fileSize,
'Content-Type': 'video/mp4'
});
fs.createReadStream(videoPath).pipe(res);
}
} else {
// 404 Not Found
res.writeHead(404, { 'Content-Type': 'text/plain' });
res.end('Not Found');
}
});
// Start the server
server.listen(8080, () => {
console.log('Server running at http://localhost:8080/');
});
बड़ी CSV फ़ाइलें संसाधित करना
const fs = require('fs');
const { Transform } = require('stream');
const csv = require('csv-parser'); // npm install csv-parser
// Create a transform stream to filter and transform CSV data
const filterTransform = new Transform({
objectMode: true,
transform(row, encoding, callback) {
// Only pass through rows that meet our criteria
if (parseInt(row.age) > 18) {
// Modify the row
row.isAdult = 'Yes';
// Push the transformed row
this.push(row);
}
callback();
}
});
// Create a writable stream for the results
const results = [];
const writeToArray = new Transform({
objectMode: true,
transform(row, encoding, callback) {
results.push(row);
callback();
}
});
// Create the processing pipeline
fs.createReadStream('people.csv')
.pipe(csv())
.pipe(filterTransform)
.pipe(writeToArray)
.on('finish', () => {
console.log(`Processed ${results.length} records:`);
console.log(results);
})
.on('error', (err) => {
console.error('Error processing CSV:', err);
});
सर्वोत्तम अभ्यास
चेतावनी:
स्ट्रीम को गलत तरीके से संभालने से मेमोरी लीक और प्रदर्शन संबंधी समस्याएं हो सकती हैं।
हमेशा त्रुटियों को संभालें और स्ट्रीम को ठीक से समाप्त करें।
सारांश
Node.js में स्ट्रीम एक मौलिक अवधारणा है जो खुले डेटा हेरफेर की अनुमति देती है। वे हैं:
- वे मेमोरी में सब कुछ लोड किए बिना डेटा को टुकड़े-टुकड़े में प्रोसेस करते हैं
- वे बड़े डेटा सेट के लिए बेहतर मेमोरी क्षमता प्रदान करते हैं
- सभी डेटा उपलब्ध होने से पहले प्रसंस्करण शुरू करने की अनुमति दें
- शक्तिशाली डेटा प्रोसेसिंग पाइपलाइन चलाएँ
- Core Node.js API का व्यापक रूप से उपयोग किया जाता है